package com.gl.sass.mq.producer.retry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.function.Consumer;

/**
 * 
 * @author xiehong
 *
 * @param <E>
 */
public class LimitDelayQueue<E extends Delayed> {

    private static final Logger log = LoggerFactory.getLogger(LimitDelayQueue.class);

    private DelayQueue<E> delayQueue;

    private List<Consumer<E>> consumers;

    private int capacity;

    public LimitDelayQueue(int capacity) {
        this.capacity = capacity;
        delayQueue = new DelayQueue<>();
        this.consumers = new LinkedList<>();
    }

    public boolean add(E delayed) {
        return checkCapacityAdd(delayed);
    }

    public E take() {
        try {
            return delayQueue.take();
        } catch (InterruptedException e) {
            Thread.interrupted();
            log.info(e.getMessage(), e);
        }
        return null;
    }

    public E peek() {
        return delayQueue.peek();
    }

    public void registConsumer(Consumer<E> consumer) {
        this.consumers.add(consumer);
    }

    public boolean isEmpty() {
        return delayQueue.isEmpty();
    }

    private boolean checkCapacityAdd(E delayed) {
        try {
            if (delayQueue.size() >= capacity) {
                E e = delayQueue.peek();
                if (Objects.nonNull(e)) {
                    consumers.forEach(o -> o.accept(e));
                    delayQueue.remove(e);
                }
            }
            return delayQueue.add(delayed);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return false;
    }

    public int size() {
        return delayQueue.size();
    }
}
